In [1]:
from __future__ import absolute_import
from __future__ import print_function
from tornado import gen
from flowz import app
from flowz.channels import *
from flowz.artifacts import *
In [2]:
def print_chans(*chans, **kwargs):
# This is a bit more elaborate than before to resolve artifacts
mode = kwargs.get('mode', 'get')
func = kwargs.get('func', print)
app.Flo([chan.map(lambda y: getattr(y, mode)()).each_ready().map(func) for chan in chans]).run()
A KeyedArtifact is an artifact that, in addition to wrapping another artifact, exposes a key associated with that artifact's value. That simple idea unlocks some powerful capabilities from flowz, and it is central to its most effective uses, particularly when you know the key ahead of knowing the values.
For instance, suppose you have bunch of daily data stored in an S3 bucket using keys that include somewhere in their pattern the relevant date of the data. You could do one boto3 query to find all the S3 keys that exist, ferret out the date from those S3 keys, and construct for each date a KeyedArtifact with that date as its key, wrapping an ExtantArtifact prepped with the full S3 key. You would then have, at the ready, an artifact that could be lazily fetched if its value were needed, and the date key on the artifact would help you to determine later whether you need to read it in. (This will become clearer later in this chapter.)
For now, here is a quick demonstration of finding the key by attribute and by index:
In [3]:
def lame_deriver(num):
return num * 100
chan = IterChannel(KeyedArtifact(i, DerivedArtifact(lame_deriver, i)) for i in range(3))
tee = chan.tee()
app.Flo([chan.map(lambda a: print('key =', a.key, '; [0] =', a[0], '; [1] =', a[1]))]).run()
print('----')
print_chans(tee)
One thing KeyedArtifacts make possible is joining between two channels using the keys of the items in each channel.
flowz provides a CoGroupChannel that joins two channels, grouping the items by their keys in a very "cover all bases" approach that is reminiscent of a "full outer join" in SQL. Its default approach can be difficult to understand (though it is documented well in the docstring for CoGroupChannel), but it can be easily specialized.
A very useful specialization is supplied by the flowz.util.channel_inner_join(a, b) function, which will join together the items in each channel that have equivalent keys.
NOTE: It is important that the channels to be joined already present their objects in the relevant "sort" order for their keys. While sorting of channels is technically possible, it is not (yet) exposed as a first-class operation for various design reasons.
Start with two channels, defined with keys that are divisible by 2 and 3, respectively:
In [4]:
chan_div_2 = IterChannel(KeyedArtifact(i, i) for i in range(1, 13) if i % 2 == 0)
chan_div_3 = IterChannel(KeyedArtifact(i, i*10) for i in range(1, 13) if i % 3 == 0)
NOTE: Those
KeyedArtifactsuse a bit of a trick where a scalar value is passed in as the artifact. This is not generally done in practice, and it is only mostly supported, but it is useful for this illustration.
In [5]:
print_chans(chan_div_2.tee())
print('----')
print_chans(chan_div_3.tee())
The normal print_chans() used in this guide just prints the fully resolved values of the artifacts, but you can visually infer the keys from the values.
This piece of code, however, shows their structures a bit more.
In [6]:
app.Flo([chan_div_2.tee().map(lambda (k, a): print('key =', k, '; value =', a.value))]).run()
print('----')
app.Flo([chan_div_3.tee().map(lambda (k, a): print('key =', k, '; value =', a.value))]).run()
Now, we can "cogroup" the channels and see what we get:
In [7]:
cogroup = chan_div_2.tee().cogroup(chan_div_3.tee())
app.Flo([cogroup.tee().map(print)]).run()
We now see that cogrouping produces tuples with artifacts from the each of the two channels. If there were N channels involved, it would be an N-tuple. Curiously, though, the first tuple has a None from the second channel, and various artifacts appear multiple times. This code will elucidate it further:
In [8]:
app.Flo([cogroup.tee().map(lambda (a1, a2): print(((a1.key, a1.value),(a2.key,a2.value) if a2 else None)))]).run()
That is the "full outer join-ish" sort of grouping that cogroup does. (See the CoGroupChannel docstring for more information.)
A full outer join would have each element present from each channel, but
Nonein positions where a particular item doesn't have a match. For instance, the second element above would be(None, (3, 30)). This "cogrouping" essentially does the same thing, but, instead of puttingNone, it reminds you what the last good value was from the other side.Why might that reminding be useful? Suppose the keys were timestamps of two independent events that happen frequently, but randomly during the day. You might never have two equivalent timestamps, but the cogrouping would allow you to figure out easily every point in time at when the "latest pair of values" changed.
We can constrain it with a filter to only those where the keys are equal:
In [9]:
chan_div_6 = cogroup.tee().filter(lambda (a, b): a is not None and b is not None and a[0] == b[0])
app.Flo([chan_div_6.tee().map(lambda (a1, a2): print(((a1.key, a1.value),(a2.key,a2.value) if a2 else None)))]).run()
Nice! And because that is so convenient, there is a function to do that for you:
In [10]:
from flowz.util import channel_inner_join
In [11]:
chan_div_6 = channel_inner_join(chan_div_2.tee(), chan_div_3.tee())
app.Flo([chan_div_6.tee().map(lambda (a1, a2): print(((a1.key, a1.value),(a2.key,a2.value) if a2 else None)))]).run()